GCS の Parquet データを BigQuery から参照してみた ~ Parquet データのロードと外部テーブルによる参照~

GCS の Parquet データを BigQuery から参照してみた ~ Parquet データのロードと外部テーブルによる参照~

Clock Icon2020.04.24

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、みかみです。

やりたいこと

  • GCS に配置した Parquet データを BigQuery で参照したい
  • BigQuery に Parquet データをロードしたい
  • BigQuery に Parquet データをロードする場合、意図通りのデータ型を自動検出してテーブル作成してくれるかどうか確認したい
  • BigQuery に Parquet データをロードする場合の制限事項を知りたい
  • パーティショニングされた Parquet データを BigQuery から参照するにはどうすればいいのか知りたい

Parquet データを準備

以下のサイトで動作確認用の CSV データを作成しました。

できたサンプルデータはこんな感じです。

id,name,kana,postal,tel,mail,password,ip,url,create_timestamp,reg_date,valid,bit,code
1,大和 貫一,ヤマト カンイチ,262-8517,080-6081-7739,pzRz6cx6B@test.org,mp5x_0uQ,150.110.95.198,http://sample.net,1988/2/2 7:09:28,1991/1/28,False,0,2607
2,篠原 茂志,シノハラ シゲシ,025-6473,090-7899-2524,BWeLhiJPN@test.jp,rzFsiiYl,16.169.224.193,http://test.jp,2008/11/15 8:42:14,1972/4/25,True,0,6750
3,近江 智恵子,オウミ チエコ,129-0310,080-6093-0838,qDnzBhS@test.net,QAOoF2NK,161.55.51.87,http://test.com,2016/4/14 18:47:33,2000/8/19,True,1,4208
4,藤原 長治,フヂワラ チョウジ,138-8408,090-7532-4618,CGxHpmaS2@sample.net,uwSE1kHr,171.132.206.6,http://sample.net,2018/11/18 12:18:54,1972/3/4,True,0,8333
5,湯川 秀明,ユカワ ヒデアキ,326-4046,080-1220-3065,IgJh4@example.com,lOuOvfr1,247.118.177.127,http://test.net,2015/11/26 23:13:43,1981/4/2,False,0,3527
6,仁平 章,ヒトヘイ アキラ,142-2086,080-8072-0869,IPMKUcb9J@test.co.jp,FkLBw3NQ,94.42.204.93,http://sample.com,2004/5/27 19:59:07,2012/5/17,True,0,7302
7,芦沢 治,アシザワ オサム,400-1426,080-1336-6892,R2wkyu@sample.net,orBU7WTv,15.158.107.210,http://sample.com,2008/10/28 18:02:33,1973/8/5,False,1,5045
8,甲田 隆之,コウダ タカユキ,001-5573,090-1434-8132,R8kzaAnh@sample.net,PbqK_QQf,84.66.207.170,http://example.net,2015/7/10 15:22:05,1972/11/4,False,0,3327
9,萩原 佳奈,ハギワラ ヨシナ,979-5021,090-8108-0632,XZcZdl0@test.co.jp,AR_OdmXO,13.55.171.197,http://example.com,2015/12/5 12:16:58,1987/3/13,False,1,279
10,大岡 桃香,オオオカ モモカ,691-8324,090-2101-2791,BTZSU@example.co.jp,J4rR75pj,11.97.118.39,http://sample.net,1993/1/25 8:29:46,2009/4/4,True,0,5110

※データ型やマルチバイト文字確認のために個人情報っぽい CSV になりましたが、全てツールで作成した疑似データです。

続いて、以下の Python コードで CSV → Parquet に変換しました。

import argparse
import os.path

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

parser = argparse.ArgumentParser(description='conv file2parquet')
parser.add_argument('file', help='file name')
args = parser.parse_args()
file_name = os.path.splitext(os.path.basename(args.file))[0]
file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), args.file)

df = pd.read_csv(file_path)
table = pa.Table.from_pandas(df)
pq.write_table(table, './{}.parquet'.format(file_name))

Parquet データを BigQuery にロード

作成した Parquet ファイルを GCS に配置して、BigQuery にロードしてみます。

Python クライアントライブラリを使用した以下のコードを実行しました。

from google.cloud import bigquery

client = bigquery.Client()
dataset_id = 'dataset_2'

dataset_ref = client.dataset(dataset_id)
job_config = bigquery.LoadJobConfig()
job_config.autodetect = True
job_config.source_format = bigquery.SourceFormat.PARQUET
uri = "gs://test-mikami/data_test/sample.parquet"
load_job = client.load_table_from_uri(
    uri, dataset_ref.table("load_parquet"), job_config=job_config
)  # API request
print("Starting job {}".format(load_job.job_id))

load_job.result()  # Waits for table load to complete.
print("Job finished.")

destination_table = client.get_table(dataset_ref.table("load_parquet"))
print("Loaded {} rows.".format(destination_table.num_rows))
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet.py
Starting job d3bf92f2-3b30-4a19-b9b9-ef6ab8f5503e
Job finished.
Loaded 10 rows.

念のためデータを確認してみます。

(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ bq query --use_legacy_sql=false 'SELECT * FROM `cm-da-mikami-yuki-258308`.dataset_2.load_parquet ORDER BY id'
Waiting on bqjob_r6503d3677d28bcbd_00000171a0414e50_1 ... (0s) Current status: DONE
+----+-------------+-------------------+----------+---------------+----------------------+----------+-----------------+--------------------+---------------------+-----------+-------+-----+------+
| id |     name    |       kana        |  postal  |      tel      |         mail         | password |       ip        |        url         |  create_timestamp   | reg_date  | valid | bit | code |
+----+-------------+-------------------+----------+---------------+----------------------+----------+-----------------+--------------------+---------------------+-----------+-------+-----+------+
|  1 | 大和 貫一   | ヤマト カンイチ   | 262-8517 | 080-6081-7739 | pzRz6cx6B@test.org   | mp5x_0uQ | 150.110.95.198  | http://sample.net  | 1988/2/2 7:09:28    | 1991/1/28 | false |   0 | 2607 |
|  2 | 篠原 茂志   | シノハラ シゲシ   | 025-6473 | 090-7899-2524 | BWeLhiJPN@test.jp    | rzFsiiYl | 16.169.224.193  | http://test.jp     | 2008/11/15 8:42:14  | 1972/4/25 |  true |   0 | 6750 |
|  3 | 近江 智恵子 | オウミ チエコ     | 129-0310 | 080-6093-0838 | qDnzBhS@test.net     | QAOoF2NK | 161.55.51.87    | http://test.com    | 2016/4/14 18:47:33  | 2000/8/19 |  true |   1 | 4208 |
|  4 | 藤原 長治   | フヂワラ チョウジ | 138-8408 | 090-7532-4618 | CGxHpmaS2@sample.net | uwSE1kHr | 171.132.206.6   | http://sample.net  | 2018/11/18 12:18:54 | 1972/3/4  |  true |   0 | 8333 |
|  5 | 湯川 秀明   | ユカワ ヒデアキ   | 326-4046 | 080-1220-3065 | IgJh4@example.com    | lOuOvfr1 | 247.118.177.127 | http://test.net    | 2015/11/26 23:13:43 | 1981/4/2  | false |   0 | 3527 |
|  6 | 仁平 章     | ヒトヘイ アキラ   | 142-2086 | 080-8072-0869 | IPMKUcb9J@test.co.jp | FkLBw3NQ | 94.42.204.93    | http://sample.com  | 2004/5/27 19:59:07  | 2012/5/17 |  true |   0 | 7302 |
|  7 | 芦沢 治     | アシザワ オサム   | 400-1426 | 080-1336-6892 | R2wkyu@sample.net    | orBU7WTv | 15.158.107.210  | http://sample.com  | 2008/10/28 18:02:33 | 1973/8/5  | false |   1 | 5045 |
|  8 | 甲田 隆之   | コウダ タカユキ   | 001-5573 | 090-1434-8132 | R8kzaAnh@sample.net  | PbqK_QQf | 84.66.207.170   | http://example.net | 2015/7/10 15:22:05  | 1972/11/4 | false |   0 | 3327 |
|  9 | 萩原 佳奈   | ハギワラ ヨシナ   | 979-5021 | 090-8108-0632 | XZcZdl0@test.co.jp   | AR_OdmXO | 13.55.171.197   | http://example.com | 2015/12/5 12:16:58  | 1987/3/13 | false |   1 |  279 |
| 10 | 大岡 桃香   | オオオカ モモカ   | 691-8324 | 090-2101-2791 | BTZSU@example.co.jp  | J4rR75pj | 11.97.118.39    | http://sample.net  | 1993/1/25 8:29:46   | 2009/4/4  |  true |   0 | 5110 |
+----+-------------+-------------------+----------+---------------+----------------------+----------+-----------------+--------------------+---------------------+-----------+-------+-----+------+

正常にロードできています。

テーブルスキーマを確認してみると bool 型は自動判定してくれましたが、TIMESTAMP 型を判定してくれてないようです。。

(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ bq show --schema --format=prettyjson cm-da-mikami-yuki-258308:dataset_2.load_parquet
[
  {
    "mode": "NULLABLE",
    "name": "id",
    "type": "INTEGER"
  },
  {
    "mode": "NULLABLE",
    "name": "name",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "kana",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "postal",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "tel",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "mail",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "password",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "ip",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "url",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "create_timestamp",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "reg_date",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "valid",
    "type": "BOOLEAN"
  },
  {
    "mode": "NULLABLE",
    "name": "bit",
    "type": "INTEGER"
  },
  {
    "mode": "NULLABLE",
    "name": "code",
    "type": "INTEGER"
  }
]

日付フォーマットの問題かと、元の CSV ファイル日付項目もフォーマットをハイフン( - )区切りに変更してみましたが、一向に TIMESTAMP 型で検出してくれず。。

ドキュメント確認すると、Parquet → BigQuery ロード時の型変換に関する記載がありました。

ということは、Parquet ファイルできちんと型定義ができていなかった?

CSV から Parquet に変換する Python コードに Parquet のデータ型定義を追加して、再度 Parquet ファイルを作成しました。

(省略)
column_types = {
    'id': int64(),
    'name': string(),
    'kana': string(), 
    'postal': string(), 
    'tel': string(), 
    'password': string(), 
    'ip': string(), 
    'url': string(),
    'create_timestamp': timestamp('s'),
    'reg_date': timestamp('s'),
    'valid': bool_(),
    'bit': float32(),
    'code': float64(),
}
(省略)

実行してみます。

(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet.py
Starting job 5e0aee11-aa45-4e5a-8514-75e64bbff435
Job finished.
Loaded 10 rows.
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ bq query --use_legacy_sql=false 'SELECT create_timestamp, reg_date FROM `cm-da-mikami-yuki-258308`.dataset_2.load_parquet_3 ORDER BY id'
Waiting on bqjob_r5b022bd287d25855_00000171a0e6b1da_1 ... (0s) Current status: DONE
+---------------------+---------------------+
|  create_timestamp   |      reg_date       |
+---------------------+---------------------+
| 1988-02-02 07:09:28 | 1991-01-28 00:00:00 |
| 2008-11-15 08:42:14 | 1972-04-25 00:00:00 |
| 2016-04-14 18:47:33 | 2000-08-19 00:00:00 |
| 2018-11-18 12:18:54 | 1972-03-04 00:00:00 |
| 2015-11-26 23:13:43 | 1981-04-02 00:00:00 |
| 2004-05-27 19:59:07 | 2012-05-17 00:00:00 |
| 2008-10-28 18:02:33 | 1973-08-05 00:00:00 |
| 2015-07-10 15:22:05 | 1972-11-04 00:00:00 |
| 2015-12-05 12:16:58 | 1987-03-13 00:00:00 |
| 1993-01-25 08:29:46 | 2009-04-04 00:00:00 |
+---------------------+---------------------+
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ bq show --schema --format=prettyjson cm-da-mikami-yuki-258308:dataset_2.load_parquet_3
[
  {
    "mode": "NULLABLE",
    "name": "id",
    "type": "INTEGER"
  },
  {
    "mode": "NULLABLE",
    "name": "name",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "kana",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "postal",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "tel",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "mail",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "password",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "ip",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "url",
    "type": "STRING"
  },
  {
    "mode": "NULLABLE",
    "name": "create_timestamp",
    "type": "TIMESTAMP"
  },
  {
    "mode": "NULLABLE",
    "name": "reg_date",
    "type": "TIMESTAMP"
  },
  {
    "mode": "NULLABLE",
    "name": "valid",
    "type": "BOOLEAN"
  },
  {
    "mode": "NULLABLE",
    "name": "bit",
    "type": "FLOAT"
  },
  {
    "mode": "NULLABLE",
    "name": "code",
    "type": "FLOAT"
  }
]

今度は、きちんと Parquet で定義したデータ型通りにテーブルスキーマが自動作成されました。

圧縮した Parquet データを BigQuery にロード

ドキュメントによると、Parquet データでは、以下の圧縮形式をサポートしているとのことです。

  • Snappy
  • GZip
  • LZO_1C and LZO_1X

SNAPPYGZIPLZ4ZSTD 4つの圧縮形式のファイルを準備しました。

それぞれ、BigQuery にロードしてみます。

(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_compress.py SNAPPY
Starting job d07720c7-107d-43bd-b08e-ff40caddca53
Job finished.
Loaded 10 rows.
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_compress.py GZIP
Starting job 558c2d7d-4970-4591-b787-3442b811106c
Job finished.
Loaded 10 rows.
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_compress.py LZ4
Starting job a6406969-5e65-4ec7-a932-0488ea8d7368
Job finished.
Loaded 10 rows.
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_compress.py ZSTD
Starting job 6bd4a1a4-705a-423e-9ea0-cc6ab258735b
Job finished.
Loaded 10 rows.

LZ4ZSTD はドキュメントには記載がなかった形式ですが、ロード成功したようです。

データ内容も確認しましたが、問題なくロードできていました。

ドキュメントに記載のない圧縮形式でもロードはできましたが、データ型のバリエーションなど十分なサンプルデータではないので、サポート明記のある圧縮形式データを使用した方が無難でしょうか。

CSV と Parquet データロード時間を比較

体感的に、Parquet データのロードの場合、CSV や JSON データをロードした時よりも時間がかかっているように感じました。

ロードファイルが CSV の場合と、圧縮なしの Parquet の場合、SNAPPY 圧縮の Parquet の場合で、ロード処理処理時間にどのくらい差分があるのか確認してみます。

データロード用の Python コードを、テーブルがない場合は新規作成、ある場合は追記モードでロードするよう変更し、ロード完了のタイミングで処理時間を出力するよう修正しました。

from google.cloud import bigquery
import time

t1 = time.time()
(省略)
load_job = client.load_table_from_uri(
    uri, dataset_ref.table("load_csv"), job_config=job_config
)  # API request
print("Starting job {}".format(load_job.job_id))

load_job.result()  # Waits for table load to complete.
print("Job finished. -> {} sec".format(time.time()-t1))

destination_table = client.get_table(dataset_ref.table("load_csv"))
print("Loaded {} rows.".format(destination_table.num_rows))

まずは CSV ファイルをロードします。

(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_csv.py
Starting job 13369eaa-2784-4b52-8037-32ffccd9a883
Job finished. -> 1.900064468383789 sec
Loaded 10 rows.
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_csv.py
Starting job 0b222a3e-306a-4b1d-9622-6fd9af32d3d6
Job finished. -> 2.299731731414795 sec
Loaded 20 rows.
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_csv.py
Starting job 7562ade7-a855-4347-bf4e-9221526df9ca
Job finished. -> 1.862321376800537 sec
Loaded 30 rows.
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_csv.py
Starting job 3363bb79-65a5-4d0b-831b-17bb560c377c
Job finished. -> 1.6943325996398926 sec
Loaded 40 rows.
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_csv.py
Starting job 77b740d4-4c8a-46fb-8acd-3bcc5bb89c48
Job finished. -> 1.9005563259124756 sec
Loaded 50 rows.

クライアントライブラリ経由なので API 通信が必要になり、ネットワーク速度の影響もあるため純粋にデータロード時間だけというわけではありませんが、CSV ファイルの場合、ロード時間はだいたい 2 秒ほどでした。

次に、圧縮なしの Parquet ファイルを同様にロードしてみます。

(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_none.py
Starting job 0426ca2b-e9f2-4fe0-a90d-a804b4272d01
Job finished. -> 8.229813814163208 sec
Loaded 10 rows.
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_none.py
Starting job 7837e5a7-fe70-43cf-8538-75740b48be0b
Job finished. -> 8.890910625457764 sec
Loaded 20 rows.
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_none.py
Starting job b8a69558-b8e6-4ef8-b783-f0bf5b24d8c1
Job finished. -> 14.763366937637329 sec
Loaded 30 rows.
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_none.py
Starting job ef34757c-af56-47e3-aa5e-58ed0282f879
Job finished. -> 6.311228036880493 sec
Loaded 40 rows.
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_none.py
Starting job de632b3b-e65c-4c48-bbdf-20d964becc99
Job finished. -> 9.726810693740845 sec
Loaded 50 rows.

API通信の問題か外れ値はあるものの、約 8 秒ほどかかっています。。

最後に、SNAPPY 圧縮の Parquet データです。

(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_snappy.py
Starting job 0a9c59a2-5b37-440f-a5c1-402fa37c29e0
Job finished. -> 11.509429454803467 sec
Loaded 10 rows.
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_snappy.py
Starting job 72733262-a77c-445f-810b-374169abf989
Job finished. -> 6.40911602973938 sec
Loaded 20 rows.
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_snappy.py
Starting job 091a6961-3420-4cf6-be80-3da06206522f
Job finished. -> 6.870298385620117 sec
Loaded 30 rows.
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_snappy.py
Starting job 8c3061ca-d7b6-4248-8b7f-e24dcba9496f
Job finished. -> 10.682614088058472 sec
Loaded 40 rows.
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_snappy.py
Starting job a12066af-c910-4dbb-97be-38a46f3e9fc2
Job finished. -> 11.705443859100342 sec
Loaded 50 rows.

圧縮なしの Parquet よりも、1 秒くらい多く時間がかかっているようです。。

列指向の BigQuery なので、同じく列指向フォーマットの Parquet の方がロード処理が速いかと思いましたが、データロード時にはおそらく型変換などのために一度ファイルデータをパースする必要があるため、シンプルなフォーマットのソースデータファイルの方がより高速に処理できるようです。

GCS 上でパティショニングされた Parquet ファイルをロード

Hive や Spark で使用される Parquet ファイルは、ディレクトリパスに年、月、日などの指定を含むパーティショニング状態で配置されることが多いと思います。

BigQuery へのデータロードでは、Parquet に限らず、パーティショニングデータのロードをオプションで指定することができます。

以下の、hive_partitioning オプション指定のある Python コードで Parquet データをロードしてみます。

※2020/04 現在、Python クライアントライブラリでは hive_partitioning オプションはベータ版とのことです。

(省略)
job_config = bigquery.LoadJobConfig()
job_config.autodetect = True
job_config.source_format = bigquery.SourceFormat.PARQUET
opt = bigquery.external_config.HivePartitioningOptions()
opt.mode = 'AUTO'
opt.source_uri_prefix = 'gs://test-mikami/data_test/ext_parquet/'
job_config.hive_partitioning = opt
uri = 'gs://test-mikami/data_test/ext_parquet/*'
load_job = client.load_table_from_uri(
    uri, dataset_ref.table("load_parquet_partition"), job_config=job_config
)  # API request
(省略)

GCS には、/dt=2020-04-21//dt=2020-04-22//dt=2020-04-23/ の3つのパスの配下に、それぞれ1ファイルずつ配置してある状態です。

(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ gsutil ls -r gs://test-mikami/data_test/ext_parquet
gs://test-mikami/data_test/ext_parquet/:
gs://test-mikami/data_test/ext_parquet/

gs://test-mikami/data_test/ext_parquet/dt=2020-04-21/:
gs://test-mikami/data_test/ext_parquet/dt=2020-04-21/
gs://test-mikami/data_test/ext_parquet/dt=2020-04-21/sample_p1.parquet

gs://test-mikami/data_test/ext_parquet/dt=2020-04-22/:
gs://test-mikami/data_test/ext_parquet/dt=2020-04-22/
gs://test-mikami/data_test/ext_parquet/dt=2020-04-22/sample_p2.parquet

gs://test-mikami/data_test/ext_parquet/dt=2020-04-23/:
gs://test-mikami/data_test/ext_parquet/dt=2020-04-23/
gs://test-mikami/data_test/ext_parquet/dt=2020-04-23/sample_p3.parquet

実行してみます。

(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_partition.py
Starting job 1c9fcdf3-89dd-4a43-9323-0c114eb87451
Job finished.
Loaded 10 rows.

パス分割されていた全ての配置ファイルのデータがロードされていることが確認できました。

hive_partitioning オプションを指定する場合、Parquet ファイルの配置パスは Hive パーティショニングレイアウトに従う必要があります。

試しに、パスが不正なファイルを追加してみます。

uri_prefix の直下と dt=YYYY-MM-DD を含まないパスの下にも、Parquet ファイルを配置しました。

(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ gsutil ls -r gs://test-mikami/data_test/ext_parquet
gs://test-mikami/data_test/ext_parquet/:
gs://test-mikami/data_test/ext_parquet/
gs://test-mikami/data_test/ext_parquet/sample_p1.parquet

gs://test-mikami/data_test/ext_parquet/dt=2020-04-21/:
gs://test-mikami/data_test/ext_parquet/dt=2020-04-21/
gs://test-mikami/data_test/ext_parquet/dt=2020-04-21/sample_p1.parquet

gs://test-mikami/data_test/ext_parquet/dt=2020-04-22/:
gs://test-mikami/data_test/ext_parquet/dt=2020-04-22/
gs://test-mikami/data_test/ext_parquet/dt=2020-04-22/sample_p2.parquet

gs://test-mikami/data_test/ext_parquet/dt=2020-04-23/:
gs://test-mikami/data_test/ext_parquet/dt=2020-04-23/
gs://test-mikami/data_test/ext_parquet/dt=2020-04-23/sample_p3.parquet

gs://test-mikami/data_test/ext_parquet/work/:
gs://test-mikami/data_test/ext_parquet/work/
gs://test-mikami/data_test/ext_parquet/work/sample_p1.parquet

ロード実行してみると

(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_partition.py
Starting job 8bf21632-6f36-432e-aa2b-14290245d773
Traceback (most recent call last):
  File "load_parquet_partition.py", line 21, in <module>
    load_job.result()  # Waits for table load to complete.
  File "/home/ec2-user/test_bq/lib/python3.7/site-packages/google/cloud/bigquery/job.py", line 818, in result
    return super(_AsyncJob, self).result(timeout=timeout)
  File "/home/ec2-user/test_bq/lib/python3.7/site-packages/google/api_core/future/polling.py", line 127, in result
    raise self._exception
google.api_core.exceptions.BadRequest: 400 Partition keys should be invariant from table creation across all partitions, with the number of partition keys held constant with invariant names.  Expected 0 partition keys ([]), but 1 ([dt]) were encountered along path /bigstore/test-mikami/data_test/ext_parquet/dt=2020-04-22.

パーティションキーが不正だと怒られました。。

なお、ロード対象のソースデータファイルをワイルドカードで指定すれば、パーティションキーにかかわらず、複数ファイルを一度にロードすることもできます。

(省略)
job_config = bigquery.LoadJobConfig()
job_config.autodetect = True
job_config.source_format = bigquery.SourceFormat.PARQUET
#opt = bigquery.external_config.HivePartitioningOptions()
#opt.mode = 'AUTO'
#opt.source_uri_prefix = 'gs://test-mikami/data_test/ext_parquet/'
#job_config.hive_partitioning = opt
uri = 'gs://test-mikami/data_test/ext_parquet/*'
load_job = client.load_table_from_uri(
#    uri, dataset_ref.table("load_parquet_partition"), job_config=job_config
    uri, dataset_ref.table("load_parquet_multi"), job_config=job_config
)  # API request
(省略)
(test_bq) [ec2-user@ip-10-0-43-239 test_load]$ python load_parquet_partition.py
Starting job 1e93d04f-ead5-4dbe-8b6f-28adae1aef43
Job finished.
Loaded 16 rows.

ファイルストレージ上のパーティショニングレイアウトに一致するファイルのみをロードしたい場合は、パーティショニングオプションを指定しておくと良さそうです。

パーティショニングされた Parquet ファイルを参照する外部テーブルを作成

BigQuery では、GCS 上のファイルなどの外部リソースに対して、通常のテーブル同様に SQL クエリを実行することができます。

先ほどは GCS から BigQuery にデータをロードして参照しましたが、GCS にパーティショニングされている Parquet ファイルをロードなしで直接参照する、外部テーブルを定義してみます。

※現時点では、Python クライアントライブラリ経由での Parquet ファイルの外部テーブル作成は未サポートのようです。

BigQuery 管理画面から、「ソースデータパーティショニング」を指定して、「外部テーブル」を作成しました。

SQL を実行してデータを確認してみます。

複数のパスに配置された全ての Parquet ファイルのデータが参照可能なことが確認できました。

外部テーブル作成後に GCS に dt=YYYY-MM-DD フォーマットの新しいパスを作成し、Parquet ファイルを追加して、再度 BigQuery テーブルに SQL クエリを実行してみます。

後から追加した Parquet ファイルのデータも BigQuery の外部テーブルで参照できることが確認できました。

まとめ(所感)

特にデータ分析業務では Parquet ファイルを扱うケースも多々あるかと思います。

BigQuery で外部テーブルを定義しておけば、例えば他のシステムからデイリーで Parquet ファイルが出力される場合など、ロード処理なしに GCS 上のパーティショニングデータを BigQuery のテーブルデータとして参照できるので、便利なのではないかと思いました。

参考

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.